package com.demo.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 延迟队列 案例
 *
 * @author jie.luo
 * @since 2021/3/1
 */
//@Configuration
public class RabbitMqDelayedConfig {

    @Bean
    public Exchange orderExchangeXd() {
        // 指定交换器为延迟交换器
        Map<String, Object> props = new HashMap<>();
        props.put("x-delayed-type", ExchangeTypes.DIRECT);
        return new CustomExchange("order.ex", "x-delayed-message", true, false, props);
    }

    @Bean
    public Queue orderQueueXd() {
        return new Queue("order.queue", true, false, false, null);
    }

    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueueXd()).to(orderExchangeXd()).with("key.order").noargs();
    }

    @Bean
    public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory, ObjectMapper objectMapper) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(objectMapper));
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter messageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    @Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, ObjectMapper objectMapper) {
        // SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其
        // 转换为String类型的，没有content_type都按byte[]类型
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(messageConverter(objectMapper));
        // 设置消息确认为手动
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        // 设置并发线程数
        factory.setConcurrentConsumers(10);
        // 设置最大并发线程数
        factory.setMaxConcurrentConsumers(20);
        return factory;
    }
}
